Për ndërmarrjet, sfida kryesore e mbledhjes së të dhënave nuk ka qenë kurrë thjesht "sinkronizimi", por si të sigurohet saktësia, integriteti dhe kohëzgjatja e të dhënave në një mjedis të gjerë, heterogjen dhe kompleks. ky artikull zhytet në praktikën e SUPCON për të ndërtuar një kornizë të mbledhjes së të dhënave në nivel të ndërmarrjes bazuar në Apache SeaTunnel, duke u fokusuar në ndarjen e kuptimeve dhe zgjidhjeve specifike në aspekte të tilla si konfigurimi i disponueshmërisë së lartë të klasterit, optimizimi i performancës, mekanizmat e tolerancës së gabimeve dhe monitorimi i cilësisë së të dhënave. Për ndërmarrjet, sfida kryesore e mbledhjes së të dhënave nuk ka qenë kurrë thjesht "sinkronizimi", por si të sigurohet saktësia, integriteti dhe kohëzgjatja e të dhënave në një mjedis të gjerë, heterogjen dhe kompleks. ky artikull zhytet në praktikën e SUPCON për të ndërtuar një kornizë të mbledhjes së të dhënave në nivel të ndërmarrjes bazuar në Apache SeaTunnel, duke u fokusuar në ndarjen e kuptimeve dhe zgjidhjeve specifike në aspekte të tilla si konfigurimi i disponueshmërisë së lartë të klasterit, optimizimi i performancës, mekanizmat e tolerancës së gabimeve dhe monitorimi i cilësisë së të dhënave. Dilema: Arkitektura e Koleksionit Siloed dhe Kostot e Larta të Operimit dhe Mirëmbajtjes Si një kompani platformë industriale AI që fuqizon thellë industrinë e procesit, biznesi global i SUPCON-it është në zhvillim të vazhdueshëm. Aktualisht, ajo ka pothuajse 40 degë globale dhe shërben më shumë se 35.000 konsumatorë globalë. Zgjerimi i vazhdueshëm i biznesit ka vënë kërkesa më të larta për punën e të dhënave: të dhënat jo vetëm duhet të "rregullohen shpejt", por edhe të "vendosen me saktësi". Për këtë qëllim, ne kemi ndërtuar një platformë të dhënash të mëdha të ndarë nga rrjedha për t'u përballur me skenarët komplekse. megjithatë, kompleksiteti i platformës vetë ka rritur vështirësinë e mbledhjes së të dhënave, zhvillimit dhe operacionit & mirëmbajtjes, veçanërisht në : Në të kaluarën, ne për një kohë të gjatë u mbështetëm në zgjidhje të përbërë nga mjete të shumta (si përdorimi i Sqoop për sinkronizimin e të dhënave batch në HDFS, dhe Maxwell / StreamSets për të përpunuar të dhënat incrementale të bazës së të dhënave dhe t'i shkruajmë ato në Kafka / Kudu). (1) Complex Architecture with Silos Mungesa e një mekanizmi të unifikuar monitorimi dhe paralajmërimi do të thotë se çdo anomali (të tilla si vonesat e sinkronizimit, shterimi i burimeve) kërkon shumë punëtorë për zgjidhjen e problemeve dhe "zjarrfikjen", duke e bërë të vështirë sigurimin e stabilitetit. (2) O&M Black Hole, Constantly Firefighting : Kur përballemi me burime të reja të të dhënave (të tilla si bazat e të dhënave të brendshme dhe SAP HANA), duhet të gjejmë zgjidhje përshtatjeje në mjete të ndryshme ose të zhvillojmë plug-ins në mënyrë të pavarur, gjë që e bën të pamundur për t'iu përgjigjur shpejt nevojave të biznesit. (3) Segmented Capabilities, Difficult to Expand Figura e mësipërme tregon qartë ekosistemin e mbledhjes së decentralizuar më parë. Ne e kuptuam se ky model i "disorganizuar" është bërë lidhja më e ndjeshme në përpunimin e të dhënave. Jo vetëm që nuk përputhet me shpejtësinë e zhvillimit të ardhshëm të kompanisë, por gjithashtu paraqet kërcënime të mundshme për cilësinë dhe kohëzgjatjen e të dhënave. Shkatërrimi i dilemës: Mendime mbi një kornizë të unifikuar të mbledhjes dhe përzgjedhjen e teknologjisë Pas analizës dhe mendimit të thellë, ne kemi sqaruar pesë kriteret kryesore të përzgjedhjes për teknologjitë e reja: : Ajo duhet të mbulojë plotësisht të gjitha llojet aktuale dhe të ardhshme të burimeve të të dhënave të kompanisë (nga MySQL, Oracle, HANA në Kafka, StarRocks, etj.) dhe mbështet mënyrat e mbledhjes offline dhe në kohë reale, duke zgjidhur në thelb problemin e grumbullimit të unifikuar të teknologjisë. (1) Comprehensive Connectivity Framework vetë duhet të jetë një klaster i shpërndarë me disponueshmëri të lartë me tolerancë të fortë të gabimeve. Edhe në qoftë se një nyje e vetme dështon, e gjithë shërbimi nuk duhet të ndërpritet dhe mund të shërohet automatikisht, duke siguruar funksionimin e vazhdueshëm të tubacionit të mbledhjes së të dhënave. (2) Cluster Stability and High Availability Në nivelin e ekzekutimit të detyrës, duhet të sigurojë semantikën e përpunimit Exactly-Once ose At-Least-Once për të siguruar që detyrat mund të shërohen automatikisht nga pikat e ndërprerjes pas ndërprerjeve anormale, duke eliminuar dyfishimin ose humbjen e të dhënave, e cila është guri kryesor i cilësisë së të dhënave. (3) Reliable Data Consistency Guarantee Arkitektura e saj duhet të mbështesë zgjerimin horizontal, dhe performanca e sinkronizimit mund të përmirësohet në mënyrë lineare duke shtuar nyje për të përmbushur nevojat e rritjes së të dhënave të sjellë nga zhvillimi i shpejtë i biznesit. (4) Strong Throughput Performance Ajo duhet të sigurojë një mekanizëm të plotë monitorimi dhe paralajmërimi, i cili mund të gjurmojë treguesit kryesorë të tilla si çrregullime, vonesa dhe kalimi gjatë sinkronizimit të të dhënave në kohë reale, dhe të njoftojë personelin e operacionit dhe mirëmbajtjes në kohë, duke shndërruar "zjarrfikjen" pasive në "paralajmërimin e hershëm" aktiv. (5) Observable O&M Experience Bazuar në këto pesë kritere, ne kemi kryer hulumtime të thella dhe teste krahasuese në zgjidhjet kryesore në industri.Në fund, Apache SeaTunnel ka kryer performancë të shkëlqyer në të gjitha dimensionet dhe u bë zgjidhja jonë optimale për të thyer dilemën. Our Core Requirements Apache SeaTunnel's Solutions Comprehensive Connectivity It has an extremely rich Connector ecosystem, officially supporting the reading and writing of hundreds of source/destination databases, fully covering all our data types. A single framework can unify offline and real-time collection. Cluster Stability and High Availability The separated architecture of SeaTunnel Engine ensures that even if a single Master or Worker node is abnormal, it will not affect the continuity of collection tasks. Reliable Data Consistency Guarantee It provides a powerful fault tolerance mechanism, supports Exactly-Once semantics, and can realize automatic breakpoint resumption after task abnormalities through the Checkpoint mechanism, ensuring no data loss or duplication. Strong Throughput Performance It has excellent distributed data processing capabilities. Parallelism can be adjusted through simple configuration, easily realizing horizontal expansion. Observable O&M Experience It provides rich monitoring indicators and can be seamlessly integrated with mainstream monitoring and alerting systems such as Prometheus, Grafana, and AlertManager, allowing us to have a clear understanding of the data collection process. Konektiviteti i plotë Ajo ka një ekosistem Connector jashtëzakonisht të pasur, duke mbështetur zyrtarisht leximin dhe shkrimin e qindra bazave të të dhënave burim / destinacion, duke mbuluar plotësisht të gjitha llojet e të dhënave tona. Stabiliteti i klusterit dhe disponueshmëria e lartë Arkitektura e ndarë e SeaTunnel Engine siguron që edhe nëse një nyje e vetme Master ose Worker është anormale, ajo nuk do të ndikojë në vazhdimësinë e detyrave të mbledhjes. Garanci e qëndrueshmërisë së të dhënave Ai ofron një mekanizëm të fuqishëm të tolerancës së gabimeve, mbështet semantikën Exactly-Once, dhe mund të realizojë ripërtëritjen automatike të ndarjeve pas çrregullimeve të detyrës përmes mekanizmit Checkpoint, duke siguruar asnjë humbje të të dhënave ose duplikim. Performancë e fuqishme përmes Ajo ka aftësi të shkëlqyera të përpunimit të të dhënave të shpërndara. paralelizmi mund të rregullohet përmes konfigurimit të thjeshtë, duke realizuar lehtësisht zgjerimin horizontal. Përvojë O&M Ai ofron tregues të pasur monitorimi dhe mund të integrohet pa probleme me sistemet kryesore të monitorimit dhe alarmit si Prometheus, Grafana dhe AlertManager, duke na lejuar të kemi një kuptim të qartë të procesit të mbledhjes së të dhënave. Praktika: Plane të veçanta të zbatimit dhe detaje Në fazën e hershme, ne ndërtuam bazuar në Apache SeaTunnel v2.3.5. Në atë kohë, për të përmbushur disa nevoja specifike (të tilla si trajtimin e çështjeve të ndjeshmërisë së rastit të emrave të tabelave të ndryshme të bazës së të dhënave ose emrave të fushave), ne kryem disa punë të zhvillimit të mesëm. Sidoqoftë, me zhvillimin e shpejtë të komunitetit SeaTunnel, funksionet dhe konvertuesit e versionit të ri janë bërë gjithnjë e më të plotë.Kur e përmirësuam me sukses klusterin në Apache SeaTunnel v2.3.11, ishim të befasuar të gjejmë se nevojat që më parë kërkonin zhvillim të personalizuar tani janë mbështetur në mënyrë native në versionin e ri. Aktualisht, të gjitha detyrat tona të sinkronizimit të të dhënave zbatohen në bazë të versionit zyrtar, duke arritur zero modifikime, gjë që zvogëlon shumë kostot tona të mirëmbajtjes afatgjatë dhe na lejon të gëzojmë pa probleme funksionet më të fundit dhe përmirësimet e performancës të sjellura nga komuniteti. Më poshtë janë planet tona kryesore të zbatimit bazuar në versionin v2.3.11, të cilat janë verifikuar nga vëllimi i të dhënave në nivel TB në mjedisin e prodhimit dhe kanë vënë një themel të fortë për performancën e shkëlqyer të 0 dështimeve që nga ndërtimi i klusterit. 1) Planifikimi i klusterit Për të siguruar disponueshmërinë e lartë të klasterit, rekomandohet të përqendrohemi në vendosjen e një klasteri të modalitetit të veçantë. Node CPU Memory Disk JVM Heap Master-01 8C 32G 200G 30G Master-02 8C 32G 200G 30G Worker-01 16C 64G 500G 62G Worker-02 16C 64G 500G 62G Worker-03 16C 64G 500G 62G Mjeshtër-01 8C 32G 200g 30g Mjeshtër 02 8C 32G 200g 30g Punëtorë 01 16C 64G 500g 62G Punëtorë 02 16C 64G 500g 62G Punëtorët-03 16C 64G 500g 62G (2) Faqet kryesore të konfigurimit të klasterit This configuration file is mainly used to define the execution behavior, fault tolerance mechanism, and operation and maintenance monitoring settings of jobs. It optimizes performance by enabling class loading caching and dynamic resource allocation, and ensures job fault tolerance and data consistency by configuring S3-based Checkpoints. In addition, it can enable indicator collection, log management, and settings, thereby providing comprehensive support for the stable operation, monitoring, and daily management of jobs. seatunnel.yaml seatunnel: engine: # Class loader cache mode: After enabling, it can significantly improve performance when jobs are frequently started and stopped, reducing class loading overhead. It is recommended to enable it in the production environment. classloader-cache-mode: true # Expiration time of historical job data (unit: minutes): 3 days. Historical information of completed jobs exceeding this time will be automatically cleaned up. history-job-expire-minutes: 4320 # Number of data backups backup-count: 1 # Queue type: Blocking queue queue-type: blockingqueue # Execution information printing interval (seconds): Print job execution information in the log every 60 seconds. print-execution-info-interval: 60 # Job metric information printing interval (seconds): Print detailed metric information in the log every 60 seconds. print-job-metrics-info-interval: 60 slot-service: # Dynamic Slot management: After enabling, the engine will dynamically allocate computing slots based on node resource conditions, improving resource utilization. dynamic-slot: true # Checkpoint configuration. checkpoint: interval: 60000 # Time interval between two Checkpoints, in milliseconds (ms). Here it is 1 minute. timeout: 600000 # Timeout for Checkpoint execution, in milliseconds (ms). Here it is 10 minutes. storage: type: hdfs # The storage type is declared as HDFS here, and the actual storage is in the S3 below. max-retained: 3 # Maximum number of Checkpoint histories to retain. Old Checkpoints will be automatically deleted to save space. plugin-config: storage.type: s3 # The actual configured storage type is S3 (or object storage compatible with S3 protocol such as MinIO) fs.s3a.access.key: xxxxxxx # Access Key of S3-compatible storage fs.s3a.secret.key: xxxxxxx # Secret Key of S3-compatible storage fs.s3a.endpoint: http://xxxxxxxx:8060 # Service endpoint (Endpoint) address of S3-compatible storage s3.bucket: s3a://seatunel-pro-bucket # Name of the bucket used to store Checkpoint data fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider # Authentication credential provider # Observability configuration telemetry: metric: enabled: true # Enable metric collection logs: # Enable scheduled log deletion: Enable the automatic cleaning function of log files to prevent logs from filling up the disk. scheduled-deletion-enable: true # Web UI and REST API configuration http: enable-http: true # Enable Web UI and HTTP REST API services port: 8080 # Port number bound by the Web service enable-dynamic-port: false # Disable dynamic ports. Whether to enable other ports if 8080 is occupied. # The following is the Web UI basic authentication configuration enable-basic-auth: true # Enable basic identity authentication basic-auth-username: admin # Login username basic-auth-password: xxxxxxx # Login password This JVM parameter configuration file is mainly used to ensure the stability and performance of the SeaTunnel engine during large-scale data processing. It provides basic memory guarantee by setting the heap memory and metaspace capacity, and conducts a series of optimizations specifically for the G1 garbage collector to effectively manage memory garbage, control garbage collection pause time, and improve operating efficiency. jvm_master_options # JVM heap memory -Xms30g -Xmx30g # Memory overflow diagnosis: Automatically generate a Heap Dump file when OOM occurs, and save it to the specified path for subsequent analysis. -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server # Metaspace: Limit the maximum capacity to 5GB to prevent metadata from expanding infinitely and occupying too much local memory. -XX:MaxMetaspaceSize=5g # G1 garbage collector related configuration -XX:+UseG1GC # Enable G1 garbage collector -XX:+PrintGCDetails # Print detailed GC information in the log -Xloggc:/path/to/gc.log # Output GC logs to the specified file -XX:+PrintGCDateStamps # Print timestamps in GC logs -XX:MaxGCPauseMillis=5000 # The target maximum GC pause time is 5000 milliseconds (5 seconds) -XX:InitiatingHeapOccupancyPercent=50 # Start concurrent GC cycle when heap memory usage reaches 50% -XX:+UseStringDeduplication # Enable string deduplication to save memory space -XX:GCTimeRatio=4 # Set the target ratio of GC time to application time -XX:G1ReservePercent=15 # Reserve 15% of heap memory -XX:ConcGCThreads=6 # Set the number of threads used in the concurrent GC phase to 6 -XX:G1HeapRegionSize=32m # Set the G1 region size to 32MB This configuration file defines the underlying distributed architecture and collaboration mechanism of the SeaTunnel engine cluster. It is mainly used to establish and manage network communication between cluster nodes. The configuration also includes a high-precision failure detection heartbeat mechanism to ensure that node failure problems can be quickly detected and handled, ensuring the high availability of the cluster. At the same time, it enables distributed data persistence based on S3-compatible storage, reliably saving key state information to object storage. hazelcast-master.yaml (iMap stored in self-built object storage) hazelcast: cluster-name: seatunnel # Cluster name, which must be consistent across all nodes network: rest-api: enabled: true # Enable REST API endpoint-groups: CLUSTER_WRITE: enabled: true DATA: enabled: true join: tcp-ip: enabled: true # Use TCP/IP discovery mechanism member-list: # Cluster node list - 10.xx.xx.xxx:5801 - 10.xx.xx.xxx:5801 - 10.xx.xx.xxx:5802 - 10.xx.xx.xxx:5802 - 10.xx.xx.xxx:5802 port: auto-increment: false # Disable port auto-increment port: 5801 # Fixed port 5801 properties: hazelcast.invocation.max.retry.count: 20 # Maximum number of invocation retries hazelcast.tcp.join.port.try.count: 30 # Number of TCP connection port attempts hazelcast.logging.type: log4j2 # Use log4j2 logging framework hazelcast.operation.generic.thread.count: 50 # Number of generic operation threads hazelcast.heartbeat.failuredetector.type: phi-accrual # Use Phi-accrual failure detector hazelcast.heartbeat.interval.seconds: 2 # Heartbeat interval (seconds) hazelcast.max.no.heartbeat.seconds: 180 # No heartbeat timeout (seconds) hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 # Failure detection threshold hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 # Detection sample size hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 # Minimum standard deviation (milliseconds) hazelcast.operation.call.timeout.millis: 150000 # Operation call timeout (milliseconds) map: engine*: map-store: enabled: true # Enable Map storage persistence initial-mode: EAGER # Load all data immediately at startup factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory # Persistence factory class properties: type: hdfs # Storage type namespace: /seatunnel/imap # Namespace path clusterName: seatunnel-cluster # Cluster name storage.type: s3 # Actually use S3-compatible storage fs.s3a.access.key: xxxxxxxxxxxxxxxx # S3 access key fs.s3a.secret.key: xxxxxxxxxxxxxxxx # S3 secret key fs.s3a.endpoint: http://xxxxxxx:8060 # S3 endpoint address s3.bucket: s3a://seatunel-pro-bucket # S3 storage bucket name fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider # Authentication provider (3) Shembuj të detyrave të mbledhjes ① MySQL-CDC to StarRocks Për të mbledhur të dhënat MySQL-CDC, është e nevojshme të sigurohemi që baza e burimit ka aktivizuar Binlog me formatin e ROW, përdoruesi ka lejet përkatëse, dhe paketa përkatëse MySQL Jar është vendosur në Për detaje, ju lutem referojuni faqes zyrtare: . ${SEATUNNEL_HOME}/lib https://seatunnel.apache.org/docs/2.3.11/connector-v2/source/MySQL-CDC Më poshtë është një konfigurim mostër për koleksionin tonë MySQL-CDC. env { parallelism = 1 # Parallelism is set to 1; only 1 is allowed for streaming collection job.mode = "STREAMING" # Streaming job mode job.name = cdh2sr # Job name identifier job.retry.times = 3 # Number of retries if the job fails job.retry.interval.seconds=180 # Retry interval (in seconds) } source { MySQL-CDC { base-url = "jdbc:mysql://xxxxxxx:3306/databasename" # MySQL connection address username = "xxxxxxr" # Database username password = "xxxxxx" # Database password table-names = ["databasename.table1","databasename_pro.table2"] # List of tables to sync (format: database.table name) startup.mode = "latest" # Start syncing from the latest position exactly_once = true # Enable Exactly-Once semantics debezium { include.schema.changes = "false" # Exclude schema changes snapshot.mode = when_needed # Take snapshots on demand } } } transform { TableRename { plugin_input = "cdc" # Input plugin identifier plugin_output = "rs" # Output plugin identifier convert_case = "LOWER" # Convert table names to lowercase prefix = "ods_cdh_databasename_" # Add prefix to table names } } sink { StarRocks { plugin_input = "rs" # Input plugin identifier (consistent with transform output) nodeUrls = ["xxxxxxx:8030","xxxxxxx:8030","xxxxxxx:8030"] # StarRocks FE node addresses base-url = "jdbc:mysql://xxxxxxx:3307" # StarRocks MySQL protocol address username = "xxxx" # StarRocks username password ="xxxxxxx" # StarRocks password database = "ods" # Target database enable_upsert_delete = true # Enable update/delete functionality max_retries = 3 # Number of retries if write fails http_socket_timeout_ms = 360000 # HTTP timeout (in milliseconds) retry_backoff_multiplier_ms = 2000 # Retry backoff multiplier max_retry_backoff_ms = 20000 # Maximum retry backoff time batch_max_rows = 2048 # Maximum number of rows per batch batch_max_bytes = 50000000 # Maximum bytes per batch } } ② Oracle-CDC to StarRocks Për të mbledhur të dhënat Oracle-CDC, sigurohuni që baza e burimit të ketë aktivizuar Logminer, përdoruesi ka lejet përkatëse, dhe vendosni paketat përkatëse OJDBC.Jar dhe Orai18n.jar në Për më shumë detaje, referojuni faqes zyrtare: . ${SEATUNNEL_HOME}/lib https://seatunnel.apache.org/docs/2.3.11/connector-v2/source/Oracle-CDC Në veçanti, në lidhje me problemet e vonesës që hasen gjatë mbledhjes së Oracle-CDC, ne rekomandojmë që së pari t'i kërkoni DBA-së të kontrollojë sa shpesh përdoruesit e logminerit ndërrohen. Rekomandimi zyrtar është që ta mbani atë rreth 10 herë në orë - ndërrimi shumë i shpeshtë mund të shkaktojë vonesë të zgjatur. Nëse frekuenca është shumë e lartë, rritni madhësinë e skedarëve të logut individual. -- Query log switch frequency SELECT GROUP#, THREAD#, BYTES/1024/1024 || 'MB' "SIZE", ARCHIVED, STATUS FROM V$LOG; SELECT TO_CHAR(first_time, 'YYYY-MM-DD HH24') AS hour, COUNT(*) AS switch_count FROM v$log_history WHERE first_time >= TRUNC(SYSDATE) - 1 -- Data from the past day GROUP BY TO_CHAR(first_time, 'YYYY-MM-DD HH24') ORDER BY hour; -- Query log file size SELECT F.MEMBER, L.GROUP#, L.THREAD#, L.SEQUENCE#, L.BYTES/1024/1024 AS SIZE_MB, L.ARCHIVED, L.STATUS, L.FIRST_CHANGE#, L.NEXT_CHANGE# FROM V$LOG L, V$LOGFILE F WHERE F.GROUP# = L.GROUP# ORDER BY L.GROUP#; Më poshtë është një konfigurim mostër për koleksionin tonë Oracle-CDC. env { parallelism = 1 # Parallelism is 1; only 1 is allowed for streaming collection job.mode = "STREAMING" # Streaming job mode job.name = bpm2sr # Job name identifier job.retry.times = 3 # Number of retries if the job fails job.retry.interval.seconds=180 # Retry interval (in seconds) } source { Oracle-CDC { plugin_output = "cdc" # Output plugin identifier base-url = "jdbc:oracle:thin:@xxxxxx:1521:DB" # Oracle connection address username = "xxxxxx" # Database username password = "xxxxxx" # Database password table-names = ["DB.SC.TABLE1","DB.SC.TABLE2"] # Tables to sync (format: database.schema.table name) startup.mode = "latest" # Start syncing from the latest position database-names = ["DB"] # Database name schema-names = ["SC"] # Schema name skip_analyze = true # Skip table analysis use_select_count = true # Use statistics exactly_once = true # Enable Exactly-Once semantics connection.pool.size = 20 # Connection pool size debezium { log.mining.strategy = "online_catalog" # Log mining strategy log.mining.continuous.mine = true # Continuously mine logs lob.enabled = false # Disable LOB support internal.log.mining.dml.parser ="legacy" # Use legacy DML parser } } } transform { TableRename { plugin_input = "cdc" # Input plugin identifier plugin_output = "rs" # Output plugin identifier convert_case = "LOWER" # Convert table names to lowercase prefix = "ods_crm_db_" # Add prefix to table names } } sink { StarRocks { plugin_input = "rs" # Input plugin identifier nodeUrls = ["xxxxxxx:8030","xxxxxxx:8030","xxxxxxx:8030"] # StarRocks FE nodes base-url = "jdbc:mysql://xxxxxxx:3307" # JDBC connection address username = "xxxx" # Username password ="xxxxxxx" # Password database = "ods" # Target database enable_upsert_delete = true # Enable update/delete max_retries = 3 # Maximum number of retries http_socket_timeout_ms = 360000 # HTTP timeout retry_backoff_multiplier_ms = 2000 # Retry backoff multiplier max_retry_backoff_ms = 20000 # Maximum retry backoff time batch_max_rows = 2048 # Maximum rows per batch batch_max_bytes = 50000000 # Maximum bytes per batch } } 4) Monitorimi i vëzhgimit Falë metrikave të fuqishme të monitorimit të ofruara nga versioni i ri i SeaTunnel dhe sistemit të plotë të monitorimit që kemi ndërtuar, ne mund të kuptojmë plotësisht statusin e platformës së mbledhjes së të dhënave nga të dy perspektivat e klusterit të gjerë dhe të nivelit të detyrave. ① Cluster Monitoring Statusi i nodes: Monitorimi në kohë reale i numrit të nodes së klasterit dhe gjendja e tyre e mbijetesës për të siguruar që nuk ka nodes anormale offline të Worker dhe për të garantuar aftësitë e përpunimit të klasterit. Shpërthimi i klasterit: Monitorimi i përgjithshëm i SourceReceivedQPS dhe SinkWriteQPS të klasterit për të kapur normat globale të hyrjes dhe daljes së të dhënave dhe për të vlerësuar ngarkesën e klasterit. Status i burimeve: Monitoroni CPU dhe kujtesën e nyjeve të klasterit për të siguruar një bazë për zgjerimin ose optimizimin e burimeve. Shëndeti i rrjetit: Siguroni kushte të mira të rrjetit të klusterit duke monitoruar rrahjet e brendshme të zemrës dhe vonesën e komunikimit. ② Task Monitoring Gjendja e funksionimit të detyrës: Kontrollimi në kohë reale i gjendjes së funksionimit (Run / Failed / Finished) të të gjitha detyrave është kërkesa më themelore e monitorimit. Vëllimi i sinkronizimit të të dhënave: Monitoroni SourceReceivedCount dhe SinkWriteCount të çdo detyre për të kapur kalimin e çdo tubacioni të të dhënave në kohë reale. Koha e vonesës: Ky është një nga treguesit më kritikë për detyrat e CDC. Paralajmërimet dërgohen kur ndodh vonesa e vazhdueshme në fund të mbledhjes. Rezultatet: Përfitime të matshme Pas një periudhe të funksionimit të qëndrueshëm, korniza e mbledhjes së të dhënave të gjeneratës së re e ndërtuar në bazë të Apache SeaTunnel na ka sjellë përfitime të konsiderueshme dhe të numërueshme, kryesisht të reflektuara në aspektet e mëposhtme: (1) Stabiliteti: Nga “Zjarri i Përhershëm” në “Paqe e Mendjes” : Under the old solution, 1-3 synchronization abnormalities needed to be handled per month. Since the new cluster was launched, core data synchronization tasks have maintained 0 failures, with no data service interruptions caused by the framework itself. Task failure rate reduced by over 99% : Relying on Apache SeaTunnel's Exactly-Once semantics and powerful Checkpoint mechanism, end-to-end Exactly-Once processing is achieved, completely solving the problem of potential trace data duplication or loss and fundamentally ensuring data quality. 100% data consistency : The high-availability design of the cluster ensures 99.99% service availability. Any single-point failure can be automatically recovered within minutes, with no impact on business operations. Significantly improved availability (2) Efikasiteti: Zhvillimi i dyfishtë dhe efikasiteti i O&M : From writing and maintaining multiple sets of scripts in the past to unified configuration-based development. The time to connect new data sources has been reduced from 1-2 person-days to within 1 minute, showing a significant efficiency improvement. 50% improvement in development efficiency : Now, the overall status can be monitored through the Grafana dashboard, with daily active O&M investment of less than 0.5 person-hours. 70% reduction in O&M costs : End-to-end data latency has been optimized from minutes to seconds, providing a solid foundation for real-time data analysis and decision-making. Optimized data timeliness (3) Arkitektura: Optimizimi i burimeve dhe korniza e unifikuar : Successfully integrated multiple technology stacks such as Sqoop and StreamSets into Apache SeaTunnel, greatly reducing technical complexity and long-term maintenance costs. Unified technology stack Perspektiva: Plane për të ardhmen : We will actively explore the native deployment and scheduling capabilities of Apache SeaTunnel on Kubernetes, leveraging its elastic scaling features to achieve on-demand allocation of computing resources, further optimizing costs and efficiency, and better embracing hybrid cloud and multi-cloud strategies. (1) Full cloud native adoption : Build AIOps capabilities based on the rich Metrics data collected, realizing intelligent prediction of task performance, automatic root cause analysis of faults, and intelligent parameter tuning. (2) Intelligent O&M 6.Përshkrimi Në të njëjtën kohë, ne gjithashtu falënderojmë çdo anëtar të ekipit të projektit të brendshëm të kompanisë – puna juaj e vështirë dhe guximi për të eksploruar janë çelësat për zbatimin e suksesshëm të këtij përmirësimi të arkitekturës. SUPCON shkarkoi mjetet e të dhënave të siloed për Apache SeaTunnel – tani detyrat kryesore të sinkronizimit drejtohen me dështim 0! 99% më pak dështime, 100% konsistencë, 70% më pak kosto O&M. Faleminderit shumë @ApacheSeaTunnel! #DataEngineering #OpenSource